/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.jctools.queues.atomic;

import org.jctools.queues.MessagePassingQueue;
import org.jctools.util.PortableJvmInfo;
import org.jctools.util.Pow2;

import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceArray;

/**
 * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator
 * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
 */
abstract class MpscAtomicArrayQueueL0Pad<E> extends AbstractQueue<E>
{

    long p00, p01, p02, p03, p04, p05, p06, p07;

    long p10, p11, p12, p13, p14, p15, p16;

}

/**
 * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator
 * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
 */
abstract class MpscAtomicArrayQueueActiveCycleIdField<E> extends MpscAtomicArrayQueueL0Pad<E>
{

    private static final AtomicLongFieldUpdater<MpscAtomicArrayQueueActiveCycleIdField>
        ACTIVE_CYCLE_ID_UPDATER =
        AtomicLongFieldUpdater.newUpdater(MpscAtomicArrayQueueActiveCycleIdField.class, "activeCycleId");

    private volatile long activeCycleId;

    public static int activeCycleIndex(long activeCycleId)
    {
        return (int) (activeCycleId & 1);
    }

    public final long lvActiveCycleId()
    {
        return activeCycleId;
    }

    public final void soActiveCycleId(long value)
    {
        ACTIVE_CYCLE_ID_UPDATER.lazySet(this, value);
    }

    public final boolean casActiveCycleId(long expected, long value)
    {
        return ACTIVE_CYCLE_ID_UPDATER.compareAndSet(this, expected, value);
    }

}

/**
 * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator
 * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
 */
abstract class MpscRelaxedAtomicArrayQueueMidPad<E> extends MpscAtomicArrayQueueActiveCycleIdField<E>
{

    long p01, p02, p03, p04, p05, p06, p07;

    long p10, p11, p12, p13, p14, p15, p16, p17;

}

/**
 * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator
 * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
 */
abstract class MpscRelaxedAtomicArrayQueueProducerLimitField<E> extends MpscRelaxedAtomicArrayQueueMidPad<E>
{

    private static final AtomicLongFieldUpdater<MpscRelaxedAtomicArrayQueueProducerLimitField> P_LIMIT_UPDATER =
        AtomicLongFieldUpdater.newUpdater(MpscRelaxedAtomicArrayQueueProducerLimitField.class, "producerLimit");

    private volatile long producerLimit;

    protected final long lvProducerLimit()
    {
        return producerLimit;
    }

    protected final void soProducerLimit(long newValue)
    {
        P_LIMIT_UPDATER.lazySet(this, newValue);
    }
}

/**
 * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator
 * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
 */
abstract class MpscRelaxedAtomicArrayQueueL2Pad<E> extends MpscRelaxedAtomicArrayQueueProducerLimitField<E>
{

    long p00, p01, p02, p03, p04, p05, p06, p07;

    long p10, p11, p12, p13, p14, p15, p16;

}

/**
 * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator
 * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
 */
abstract class MpscAtomicArrayQueueConsumerPositionField<E> extends MpscRelaxedAtomicArrayQueueL2Pad<E>
{

    private static final AtomicLongFieldUpdater<MpscAtomicArrayQueueConsumerPositionField> C_POS_UPDATER =
        AtomicLongFieldUpdater.newUpdater(MpscAtomicArrayQueueConsumerPositionField.class, "consumerPosition");

    protected volatile long consumerPosition;

    protected final long lvConsumerPosition()
    {
        return consumerPosition;
    }

    protected void soConsumerPosition(long newValue)
    {
        C_POS_UPDATER.lazySet(this, newValue);
    }
}

/**
 * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator
 * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
 */
abstract class MpscRelaxedAtomicArrayQueueL3Pad<E> extends MpscAtomicArrayQueueConsumerPositionField<E>
{

    long p01, p02, p03, p04, p05, p06, p07;

    long p10, p11, p12, p13, p14, p15, p16, p17;

}

public final class MpscRelaxedAtomicArrayQueue<E> extends MpscRelaxedAtomicArrayQueueL3Pad<E>
    implements MessagePassingQueue<E>
{

    //Used internally to manipulate AtomicLongArray with a pre/post PAD to avoid false sharing with the surrounding heap
    private static final class AtomicLongArrayAccess
    {

        private static final int ARRAY_PAD = (PortableJvmInfo.CACHE_LINE_SIZE * 2) / (Long.SIZE / Byte.SIZE);

        private static AtomicLongArray allocate(int size)
        {
            return new AtomicLongArray(size + (ARRAY_PAD * 2));
        }

        private static int calcValueOffset(int index)
        {
            return ARRAY_PAD + index;
        }

        protected static long lvValue(AtomicLongArray elements, int index)
        {
            return elements.get(calcValueOffset(index));
        }

        protected static void soValue(AtomicLongArray elements, int index, long value)
        {
            elements.lazySet(calcValueOffset(index), value);
        }

        protected static long getAndIncrementValue(AtomicLongArray elements, int index)
        {
            return elements.getAndIncrement(calcValueOffset(index));
        }
    }

    private final long mask;
    private final int capacity;
    private final int cycleLength;
    private final int cycleLengthLog2;
    private final AtomicReferenceArray<E> buffer;
    private final AtomicLongArray producerCycleClaim;
    private final int positionOnCycleMask;
    private final int cycleIdBitShift;
    private final long maxCycleId;

    public MpscRelaxedAtomicArrayQueue(int capacity)
    {
        this.buffer = new AtomicReferenceArray<E>(Pow2.roundToPowerOfTwo(capacity) * 2);
        this.soConsumerPosition(0);
        this.soActiveCycleId(0);
        //put a pre/post pad to avoid false sharing with the rest of the heap
        this.producerCycleClaim = AtomicLongArrayAccess.allocate(2);
        this.capacity = this.buffer.length();
        this.mask = this.capacity - 1;
        this.cycleLength = this.capacity / 2;
        this.soProducerLimit(this.cycleLength);
        this.cycleLengthLog2 = Integer.numberOfTrailingZeros(this.cycleLength);
        //it allows at least 1L << 28 = 268435456 overclaims of the position within a cycle while waiting a rotation to complete:
        //this would help to increase the cycleId domain with small capacity
        this.cycleIdBitShift =
            Math.min(32, Integer.numberOfTrailingZeros(Pow2.roundToPowerOfTwo(this.cycleLength + (1 << 28))));
        //it is the max position on cycle too
        this.positionOnCycleMask = (int) ((1L << this.cycleIdBitShift) - 1);
        this.maxCycleId = (1L << (Long.SIZE - this.cycleIdBitShift)) - 1;
    }

    @Override
    public Iterator<E> iterator()
    {
        throw new UnsupportedOperationException();
    }

    private static int positionOnCycle(long producerCycleClaim, int positionOnCycleMask)
    {
        return (int) (producerCycleClaim & positionOnCycleMask);
    }

    private static long cycleId(long producerCycleClaim, int cycleIdBitShift)
    {
        return (producerCycleClaim >>> cycleIdBitShift);
    }

    private static long producerPosition(long cycleId, int positionWithinCycle, int cycleLengthLog2)
    {
        return (cycleId << cycleLengthLog2) + positionWithinCycle;
    }

    private static int calcElementOffset(int cycle, int positionWithinCycle, int cycleLengthLog2)
    {
        return (cycle << cycleLengthLog2) + positionWithinCycle;
    }

    private static int calcElementOffset(long position, long mask)
    {
        return (int) (position & mask);
    }

    private boolean isBackPressured(long producerPosition)
    {
        final long consumerPosition = lvConsumerPosition();
        final long claimLimit = consumerPosition + this.cycleLength;
        if (producerPosition < claimLimit)
        {
            //update the cached value: no backpressure
            soProducerLimit(claimLimit);
            return false;
        }
        else
        {
            return true;
        }
    }

    private void rotateCycle(
        AtomicLongArray producerCycleClaims,
        int activeCycle,
        long producerCycleClaim,
        int cycleIdBitShift,
        long maxCycleId)
    {
        final long cycleId = cycleId(producerCycleClaim, cycleIdBitShift);
        if (cycleId >= maxCycleId)
        {
            throw new IllegalStateException("exhausted cycle id space!");
        }
        final long nextCycleId = cycleId + 1;
        //it points at the beginning of the next cycle
        final long nextProducerCycleClaim = nextCycleId << cycleIdBitShift;
        final int nextActiveCycle = (activeCycle + 1) & 1;
        //it needs to be atomic: a producer fallen behind could claim it concurrently
        AtomicLongArrayAccess.soValue(producerCycleClaims, nextActiveCycle, nextProducerCycleClaim);
        //from now on a slow producer could move the claim and lead to another rotation: a cas is needed to detect it
        if (!casActiveCycleId(cycleId, nextCycleId))
        {
            throw new IllegalStateException("slow rotation due to producer thread starvation!");
        }
    }

    private void validateSlowProducerClaim(final long cycleId, int positionOnCycle, int cycleLengthLog2)
    {
        final long producerPosition = producerPosition(cycleId, positionOnCycle, cycleLengthLog2);
        final long claimLimit = lvProducerLimit();
        if (producerPosition >= claimLimit)
        {
            //it is really backpressured?
            if (isBackPressured(producerPosition))
            {
                throw new IllegalStateException(
                    "the producer has fallen behind: please enlarge the capacity or reduce the number of producers!");
            }
        }
    }

    @Override
    public boolean offer(E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }
        //offer can fail only when backpressured, otherwise it retries
        final int positionOnCycleMask = this.positionOnCycleMask;
        final int cycleLengthLog2 = this.cycleLengthLog2;
        final int cycleLength = this.cycleLength;
        final int cycleIdBitShift = this.cycleIdBitShift;
        final AtomicReferenceArray<E> buffer = this.buffer;
        final AtomicLongArray producerCycleClaims = this.producerCycleClaim;
        final long maxCycleId = this.maxCycleId;
        final long maxPositionOnCycle = positionOnCycleMask;

        while (true)
        {
            final int activeCycle = activeCycleIndex(lvActiveCycleId());
            final long producerActiveCycleClaim = AtomicLongArrayAccess.lvValue(producerCycleClaims, activeCycle);
            final int positionOnActiveCycle = positionOnCycle(producerActiveCycleClaim, positionOnCycleMask);
            final long activeCycleId = cycleId(producerActiveCycleClaim, cycleIdBitShift);
            final long producerPosition = producerPosition(activeCycleId, positionOnActiveCycle, cycleLengthLog2);
            final long claimLimit = lvProducerLimit();
            if (producerPosition >= claimLimit)
            {
                //it is really backpressured?
                if (isBackPressured(producerPosition))
                {
                    return false;
                }
            }
            //try to claim on the active cycle
            final long producerCycleClaim =
                AtomicLongArrayAccess.getAndIncrementValue(producerCycleClaims, activeCycle);
            final int positionOnCycle = positionOnCycle(producerCycleClaim, positionOnCycleMask);
            if (positionOnCycle >= maxPositionOnCycle)
            {
                throw new IllegalStateException(
                    "too many over-claims: please enlarge the capacity or reduce the number of producers!");
            }
            if (positionOnCycle < cycleLength)
            {
                final long cycleId = cycleId(producerCycleClaim, cycleIdBitShift);
                if (cycleId != activeCycleId)
                {
                    validateSlowProducerClaim(cycleId, positionOnCycle, cycleLengthLog2);
                }
                //simplified operation to calculate the offset in the array: activeCycle hasn't changed from the beginning of the offer!
                final int producerOffset = calcElementOffset(activeCycle, positionOnCycle, cycleLengthLog2);
                buffer.lazySet(producerOffset, e);
                return true;
            }
            else if (positionOnCycle == cycleLength)
            {
                //is the only one responsible to rotate cycle: the other producers will be forced to wait until rotation got completed to perform a valid offer
                rotateCycle(producerCycleClaims, activeCycle, producerCycleClaim, cycleIdBitShift, maxCycleId);
            }
        }
    }

    @Override
    public E poll()
    {
        final long consumerPosition = lvConsumerPosition();
        final int offset = calcElementOffset(consumerPosition, this.mask);
        final AtomicReferenceArray<E> buffer = this.buffer;
        E e;
        if ((e = buffer.get(offset)) != null)
        {
            //can be used a memory_order_relaxed set here, because the consumer position write release the buffer value
            buffer.lazySet(offset, null);
            //consumer position allows the producers to move the claim limit (aka reduce backpressure)
            //hence can be set only after the buffer slot release
            soConsumerPosition(consumerPosition + 1);
            return e;
        }
        else
        {
            return pollMaybeEmpty(buffer, offset, consumerPosition);
        }
    }

    private E pollMaybeEmpty(AtomicReferenceArray<E> buffer, final int offset, final long consumerPosition)
    {
        final int activeCycleIndex = activeCycleIndex(lvActiveCycleId());
        final long producerCycleClaim = AtomicLongArrayAccess.lvValue(this.producerCycleClaim, activeCycleIndex);
        final long producerPosition = producerPosition(
            cycleId(producerCycleClaim, this.cycleIdBitShift),
            positionOnCycle(producerCycleClaim, this.positionOnCycleMask),
            this.cycleLengthLog2);
        if (producerPosition == consumerPosition)
        {
            return null;
        }
        else
        {
            E e;
            while ((e = buffer.get(offset)) == null)
            {

            }
            //can be used a memory_order_relaxed set here, because the consumer position write release the buffer value
            buffer.lazySet(offset, null);
            //consumer position allows the producers to move the claim limit (aka reduce backpressure)
            //hence can be set only after the buffer slot release
            soConsumerPosition(consumerPosition + 1);
            return e;
        }
    }

    @Override
    public E peek()
    {
        final long consumerPosition = lvConsumerPosition();
        final int offset = calcElementOffset(consumerPosition, this.mask);
        E e = this.buffer.get(offset);
        if (e != null)
        {
            return e;
        }
        final int activeCycleIndex = activeCycleIndex(lvActiveCycleId());
        final long producerCycleClaim = AtomicLongArrayAccess.lvValue(this.producerCycleClaim, activeCycleIndex);
        final long producerPosition = producerPosition(
            cycleId(producerCycleClaim, this.cycleIdBitShift),
            positionOnCycle(producerCycleClaim, this.positionOnCycleMask),
            this.cycleLengthLog2);
        if (producerPosition == consumerPosition)
        {
            return null;
        }
        else
        {
            while ((e = buffer.get(offset)) == null)
            {

            }
            return e;
        }
    }

    @Override
    public int size()
    {
        final int positionOnCycleMask = this.positionOnCycleMask;
        final int cycleLength = this.cycleLength;
        final AtomicLongArray producerCycleClaim = this.producerCycleClaim;
        long after = lvConsumerPosition();
        int positionOnCycle;
        long producerClaim;
        long before;
        do
        {
            before = after;
            final int activeClaim = activeCycleIndex(lvActiveCycleId());
            producerClaim = AtomicLongArrayAccess.lvValue(producerCycleClaim, activeClaim);
            after = lvConsumerPosition();
            positionOnCycle = positionOnCycle(producerClaim, positionOnCycleMask);
        }
        while (positionOnCycle > cycleLength || before != after);
        //need to have a stable consumer and a valid claim (into the cycle)
        final long size =
            producerPosition(cycleId(producerClaim, this.cycleIdBitShift), positionOnCycle, this.cycleLengthLog2) -
                after;
        if (size > Integer.MAX_VALUE)
        {
            return Integer.MAX_VALUE;
        }
        else
        {
            return (int) size;
        }
    }

    @Override
    public void clear()
    {
        while (poll() != null)
        {
            // if you stare into the void
        }
    }

    @Override
    public boolean isEmpty()
    {
        return size() == 0;
    }

    @Override
    public int capacity()
    {
        return this.cycleLength;
    }

    @Override
    public boolean relaxedOffer(E e)
    {
        return offer(e);
    }

    @Override
    public E relaxedPoll()
    {
        final long consumerPosition = lvConsumerPosition();
        final int offset = calcElementOffset(consumerPosition, this.mask);
        final AtomicReferenceArray<E> buffer = this.buffer;
        E e;
        if ((e = buffer.get(offset)) != null)
        {
            //can be used a memory_order_relaxed set here, because the consumer position write release the buffer value
            buffer.lazySet(offset, null);
            //consumer position allows the producers to move the claim limit (aka reduce backpressure)
            //hence can be set only after the buffer slot release
            soConsumerPosition(consumerPosition + 1);
        }
        return e;
    }

    @Override
    public E relaxedPeek()
    {
        final long consumerPosition = lvConsumerPosition();
        final int offset = calcElementOffset(consumerPosition, this.mask);
        return this.buffer.get(offset);
    }

    @Override
    public int drain(Consumer<E> c)
    {
        return drain(c, capacity());
    }

    @Override
    public int fill(Supplier<E> s)
    {
        long result = 0;// result is a long because we want to have a safepoint check at regular intervals
        final int capacity = capacity();
        do
        {
            final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH);
            if (filled == 0)
            {
                return (int) result;
            }
            result += filled;
        }
        while (result <= capacity);
        return (int) result;
    }

    @Override
    public int drain(Consumer<E> c, int limit)
    {
        final long mask = this.mask;
        final AtomicReferenceArray<E> buffer = this.buffer;
        for (int i = 0; i < limit; i++)
        {
            final long consumerPosition = lvConsumerPosition();
            final int offset = calcElementOffset(consumerPosition, mask);
            E e;
            if ((e = buffer.get(offset)) != null)
            {
                //can be used a memory_order_relaxed set here, because the consumer position write release the buffer value
                buffer.lazySet(offset, null);
                //consumer position allows the producers to move the claim limit (aka reduce backpressure)
                //hence can be set only after the buffer slot release
                soConsumerPosition(consumerPosition + 1);
                c.accept(e);
            }
            else
            {
                return i;
            }
        }
        return limit;
    }

    @Override
    public int fill(Supplier<E> s, final int limit)
    {
        final int positionOnCycleMask = this.positionOnCycleMask;
        final int cycleLengthLog2 = this.cycleLengthLog2;
        final int cycleLength = this.cycleLength;
        final int cycleIdBitShift = this.cycleIdBitShift;
        final AtomicReferenceArray<E> buffer = this.buffer;
        final AtomicLongArray producerCycleClaims = this.producerCycleClaim;
        final long maxCycleId = this.maxCycleId;
        final long maxPositionOnCycle = positionOnCycleMask;
        int i = 0;
        while (i < limit)
        {
            final int activeCycle = activeCycleIndex(lvActiveCycleId());
            final long producerActiveCycleClaim = AtomicLongArrayAccess.lvValue(producerCycleClaims, activeCycle);
            final int positionOnActiveCycle = positionOnCycle(producerActiveCycleClaim, positionOnCycleMask);
            final long activeCycleId = cycleId(producerActiveCycleClaim, cycleIdBitShift);
            final long producerPosition = producerPosition(activeCycleId, positionOnActiveCycle, cycleLengthLog2);
            final long claimLimit = lvProducerLimit();
            if (producerPosition >= claimLimit)
            {
                //it is really backpressured?
                if (isBackPressured(producerPosition))
                {
                    return i;
                }
            }
            //try to claim on the active cycle
            final long producerCycleClaim =
                AtomicLongArrayAccess.getAndIncrementValue(producerCycleClaims, activeCycle);
            final int positionOnCycle = positionOnCycle(producerCycleClaim, positionOnCycleMask);
            if (positionOnCycle >= maxPositionOnCycle)
            {
                throw new IllegalStateException(
                    "too many over-claims: please enlarge the capacity or reduce the number of producers!");
            }
            if (positionOnCycle < cycleLength)
            {
                final long cycleId = cycleId(producerCycleClaim, cycleIdBitShift);
                if (cycleId != activeCycleId)
                {
                    validateSlowProducerClaim(cycleId, positionOnCycle, cycleLengthLog2);
                }
                //simplified operation to calculate the offset in the array: activeCycle hasn't changed from the beginning of the offer!
                final int producerOffset = calcElementOffset(activeCycle, positionOnCycle, cycleLengthLog2);
                buffer.lazySet(producerOffset, s.get());
                i++;
            }
            else if (positionOnCycle == cycleLength)
            {
                //is the only one responsible to rotate cycle: the other producers will be forced to wait until rotation got completed to perform a valid offer
                rotateCycle(producerCycleClaims, activeCycle, producerCycleClaim, cycleIdBitShift, maxCycleId);
            }
        }
        return i;
    }

    @Override
    public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)
    {
        final AtomicReferenceArray<E> buffer = this.buffer;
        final long mask = this.mask;
        long consumerPosition = lvConsumerPosition();

        int counter = 0;
        while (exit.keepRunning())
        {
            for (int i = 0; i < 4096; i++)
            {
                final int offset = calcElementOffset(consumerPosition, mask);
                final E e = buffer.get(offset);// LoadLoad
                if (null == e)
                {
                    counter = w.idle(counter);
                    continue;
                }
                consumerPosition++;
                counter = 0;
                //a plain store would be enough
                buffer.lazySet(offset, null);
                soConsumerPosition(consumerPosition); // ordered store -> atomic and ordered for size()
                c.accept(e);
            }
        }
    }

    @Override
    public void fill(Supplier<E> s, WaitStrategy w, ExitCondition exit)
    {
        int idleCounter = 0;
        while (exit.keepRunning())
        {
            if (fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) == 0)
            {
                idleCounter = w.idle(idleCounter);
                continue;
            }
            idleCounter = 0;
        }
    }

    @Override
    public String toString()
    {
        return this.getClass().getName();
    }
}
